package com.xiaohu.transfrom;

import com.xiaohu.bean.WaterSensor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/*
    DataStream是没有直接进行聚合的api的
    keyBy通过指定的键，可以将一条流从逻辑上划分成不同的分区，就是并行处理的子任务
    相同key的数据发向同一个分区
    内部默认是使用计算key的哈希值
 */
public class KeyByDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<WaterSensor> senorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3),
                new WaterSensor("s1", 4L, 4),
                new WaterSensor("s2", 5L, 5),
                new WaterSensor("s1", 6L, 6)
        );

        /*
            1、分组后的数据类型是KeyedStream类型，键控流类型
            2、而不是xxxxOperator类型，说明keyBy算子不是一个转换算子只是对数据进行重分区
            3、因为不是转换算子，所以不能设置并行度
            4、keyBy 分组和分区的关系：
                1）keyBy是对数据进行分组，保证相同的key的数据在同一个分组
                2）分区：一个子任务可以理解为一个分区
              keyBy是分组，保证相同的分组，到一个分区中，一个分区可以包含多个分组


         */
        KeyedStream<WaterSensor, String> sensorKS = senorDS.keyBy(new KeySelector<WaterSensor, String>() {
            @Override
            public String getKey(WaterSensor value) throws Exception {
                return value.getId();
            }
        });

        sensorKS.print();


        env.execute();
    }
}
